/*-------------------------------------------------------------------------
 *
 * xloginsert.c
 *		Functions for constructing WAL records
 *
 * Constructing a WAL record begins with a call to XLogBeginInsert,
 * followed by a number of XLogRegister* calls. The registered data is
 * collected in private working memory, and finally assembled into a chain
 * of XLogRecData structs by a call to XLogRecordAssemble(). See
 * access/transam/README for details.
 *
 * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 * src/backend/access/transam/xloginsert.c
 *
 *-------------------------------------------------------------------------
 */

#include "postgres.h"

#include "access/xact.h"
#include "access/xlog.h"
#include "access/xlog_internal.h"
#include "access/xloginsert.h"
#include "catalog/pg_control.h"
#include "common/pg_lzcompress.h"
#include "executor/instrument.h"
#include "miscadmin.h"
#include "pg_trace.h"
#include "replication/origin.h"
#include "storage/bufmgr.h"
#include "storage/proc.h"
#include "storage/spin.h"
#include "utils/memutils.h"
#include "access/heapam_xlog.h"
#include "access/nbtxlog.h"
#include "access/nbtree.h"
#include "access/gistxlog.h"
#include "access/gist_private.h"
#include "access/spgxlog.h"
#include "access/brin_xlog.h"

/* Buffer size required to store a compressed version of backup block image */
#define PGLZ_MAX_BLCKSZ PGLZ_MAX_OUTPUT(BLCKSZ)

/*
 * For each block reference registered with XLogRegisterBuffer, we fill in
 * a registered_buffer struct.
 */
typedef struct
{
	bool		in_use;			/* is this slot in use? */
	uint8		flags;			/* REGBUF_* flags */
	RelFileNode rnode;			/* identifies the relation and block */
	ForkNumber	forkno;
	BlockNumber block;
	Page		page;			/* page content */
	uint32		rdata_len;		/* total length of data in rdata chain */
	XLogRecData *rdata_head;	/* head of the chain of data registered with
								 * this block */
	XLogRecData *rdata_tail;	/* last entry in the chain, or &rdata_head if
								 * empty */

	XLogRecData bkp_rdatas[2];	/* temporary rdatas used to hold references to
								 * backup block data in XLogRecordAssemble() */

	/* buffer to store a compressed version of backup block image */
	char		compressed_page[PGLZ_MAX_BLCKSZ];
} registered_buffer;

static registered_buffer *registered_buffers;
static int	max_registered_buffers; /* allocated size */
static int	max_registered_block_id = 0;	/* highest block_id + 1 currently
											 * registered */

int 	group_total_len;
int    grouo_rec_count;
int    grouo_rec_cur_count;

XLogRecord *grouphead[XLR_MAX_BLOCK_ID + 1];
int     grouplens[XLR_MAX_BLOCK_ID + 1];
XLogRecData groupRecData[XLR_MAX_BLOCK_ID + 1];
XLogRecPtr groupEndLsn[XLR_MAX_BLOCK_ID + 1];

/*
 * A chain of XLogRecDatas to hold the "main data" of a WAL record, registered
 * with XLogRegisterData(...).
 */
static XLogRecData *mainrdata_head;
static XLogRecData *mainrdata_last = (XLogRecData *) &mainrdata_head;
static uint32 mainrdata_len;	/* total # of bytes in chain */

/* flags for the in-progress insertion */
static uint8 curinsert_flags = 0;

/*
 * These are used to hold the record header while constructing a record.
 * 'hdr_scratch' is not a plain variable, but is palloc'd at initialization,
 * because we want it to be MAXALIGNed and padding bytes zeroed.
 *
 * For simplicity, it's allocated large enough to hold the headers for any
 * WAL record.
 */
//static XLogRecData hdr_rdt;
static char *hdr_scratch = NULL;

#define SizeOfXlogOrigin	(sizeof(RepOriginId) + sizeof(char))
#define SizeOfXLogTransactionId	(sizeof(TransactionId) + sizeof(char))

#define HEADER_SCRATCH_SIZE \
	((SizeOfXLogRecord + \
	 MaxSizeOfXLogRecordBlockHeader + \
	 SizeOfXLogRecordDataHeaderLong + SizeOfXlogOrigin + \
	 SizeOfXLogTransactionId) * (XLR_MAX_BLOCK_ID + 1))

#define SINGLE_SCRATCH_SIZE \
	(SizeOfXLogRecord + \
	 MaxSizeOfXLogRecordBlockHeader + \
	 SizeOfXLogRecordDataHeaderLong + SizeOfXlogOrigin + \
	 SizeOfXLogTransactionId)

/*
 * An array of XLogRecData structs, to hold registered data.
 */
static XLogRecData *rdatas;
static int	num_rdatas;			/* entries currently used */
static int	max_rdatas;			/* allocated size */

static bool begininsert_called = false;

/* Memory context to hold the registered buffer and data references. */
static MemoryContext xloginsert_cxt;

static XLogRecData *XLogRecordAssemble(RmgrId rmid, uint8 info,
									   XLogRecPtr RedoRecPtr, bool doPageWrites,
									   XLogRecPtr *fpw_lsn, int *num_fpi);
static bool XLogCompressBackupBlock(char *page, uint16 hole_offset,
									uint16 hole_length, char *dest, uint16 *dlen);

/*
 * Begin constructing a WAL record. This must be called before the
 * XLogRegister* functions and XLogInsert().
 */
void
XLogBeginInsert(void)
{
	Assert(max_registered_block_id == 0);
	Assert(mainrdata_last == (XLogRecData *) &mainrdata_head);
	Assert(mainrdata_len == 0);

	/* cross-check on whether we should be here or not */
	if (!XLogInsertAllowed())
		elog(ERROR, "cannot make new WAL entries during recovery");

	if (begininsert_called)
		elog(ERROR, "XLogBeginInsert was already called");

	begininsert_called = true;
}

/*
 * Ensure that there are enough buffer and data slots in the working area,
 * for subsequent XLogRegisterBuffer, XLogRegisterData and XLogRegisterBufData
 * calls.
 *
 * There is always space for a small number of buffers and data chunks, enough
 * for most record types. This function is for the exceptional cases that need
 * more.
 */
void
XLogEnsureRecordSpace(int max_block_id, int ndatas)
{
	int			nbuffers;

	/*
	 * This must be called before entering a critical section, because
	 * allocating memory inside a critical section can fail. repalloc() will
	 * check the same, but better to check it here too so that we fail
	 * consistently even if the arrays happen to be large enough already.
	 */
	Assert(CritSectionCount == 0);

	/* the minimum values can't be decreased */
	if (max_block_id < XLR_NORMAL_MAX_BLOCK_ID)
		max_block_id = XLR_NORMAL_MAX_BLOCK_ID;
	if (ndatas < XLR_NORMAL_RDATAS)
		ndatas = XLR_NORMAL_RDATAS;

	if (max_block_id > XLR_MAX_BLOCK_ID)
		elog(ERROR, "maximum number of WAL record block references exceeded");
	nbuffers = max_block_id + 1;

	if (nbuffers > max_registered_buffers)
	{
		registered_buffers = (registered_buffer *)
			repalloc(registered_buffers, sizeof(registered_buffer) * nbuffers);

		/*
		 * At least the padding bytes in the structs must be zeroed, because
		 * they are included in WAL data, but initialize it all for tidiness.
		 */
		MemSet(&registered_buffers[max_registered_buffers], 0,
			   (nbuffers - max_registered_buffers) * sizeof(registered_buffer));
		max_registered_buffers = nbuffers;
	}

	if (ndatas > max_rdatas)
	{
		rdatas = (XLogRecData *) repalloc(rdatas, sizeof(XLogRecData) * ndatas);
		max_rdatas = ndatas;
	}
}

/*
 * Reset WAL record construction buffers.
 */
void
XLogResetInsertion(void)
{
	int			i;

	/* reset the subxact assignment flag (if needed) */
	if (curinsert_flags & XLOG_INCLUDE_XID)
		MarkSubTransactionAssigned();

	for (i = 0; i < max_registered_block_id; i++)
		registered_buffers[i].in_use = false;

	num_rdatas = 0;
	max_registered_block_id = 0;
	mainrdata_len = 0;
	mainrdata_last = (XLogRecData *) &mainrdata_head;
	curinsert_flags = 0;
	begininsert_called = false;
}

/*
 * Register a reference to a buffer with the WAL record being constructed.
 * This must be called for every page that the WAL-logged operation modifies.
 */
void
XLogRegisterBuffer(uint8 block_id, Buffer buffer, uint8 flags)
{
	registered_buffer *regbuf;

	/* NO_IMAGE doesn't make sense with FORCE_IMAGE */
	Assert(!((flags & REGBUF_FORCE_IMAGE) && (flags & (REGBUF_NO_IMAGE))));
	Assert(begininsert_called);

	if (block_id >= max_registered_block_id)
	{
		if (block_id >= max_registered_buffers)
			elog(ERROR, "too many registered buffers");
		max_registered_block_id = block_id + 1;
	}

	regbuf = &registered_buffers[block_id];

	BufferGetTag(buffer, &regbuf->rnode, &regbuf->forkno, &regbuf->block);
	regbuf->page = BufferGetPage(buffer);
	regbuf->flags = flags;
	regbuf->rdata_tail = (XLogRecData *) &regbuf->rdata_head;
	regbuf->rdata_len = 0;

	/*
	 * Check that this page hasn't already been registered with some other
	 * block_id.
	 */
#ifdef USE_ASSERT_CHECKING
	{
		int			i;

		for (i = 0; i < max_registered_block_id; i++)
		{
			registered_buffer *regbuf_old = &registered_buffers[i];

			if (i == block_id || !regbuf_old->in_use)
				continue;

			Assert(!RelFileNodeEquals(regbuf_old->rnode, regbuf->rnode) ||
				   regbuf_old->forkno != regbuf->forkno ||
				   regbuf_old->block != regbuf->block);
		}
	}
#endif

	regbuf->in_use = true;
}

//new function
RelFileNode
XLogRegisterBufferExtend(uint8 block_id, Buffer buffer, uint8 flags)
{
	registered_buffer *regbuf;

	/* NO_IMAGE doesn't make sense with FORCE_IMAGE */
	Assert(!((flags & REGBUF_FORCE_IMAGE) && (flags & (REGBUF_NO_IMAGE))));
	Assert(begininsert_called);

	if (block_id >= max_registered_block_id)
	{
		if (block_id >= max_registered_buffers)
			elog(ERROR, "too many registered buffers");
		max_registered_block_id = block_id + 1;
	}

	regbuf = &registered_buffers[block_id];

	BufferGetTag(buffer, &regbuf->rnode, &regbuf->forkno, &regbuf->block);
	regbuf->page = BufferGetPage(buffer);
	regbuf->flags = flags;
	regbuf->rdata_tail = (XLogRecData *) &regbuf->rdata_head;
	regbuf->rdata_len = 0;

	/*
     * Check that this page hasn't already been registered with some other
     * block_id.
     */
#ifdef USE_ASSERT_CHECKING
	{
		int			i;

		for (i = 0; i < max_registered_block_id; i++)
		{
			registered_buffer *regbuf_old = &registered_buffers[i];

			if (i == block_id || !regbuf_old->in_use)
				continue;

			Assert(!RelFileNodeEquals(regbuf_old->rnode, regbuf->rnode) ||
				   regbuf_old->forkno != regbuf->forkno ||
				   regbuf_old->block != regbuf->block);
		}
	}
#endif

	regbuf->in_use = true;
	return regbuf->rnode;
}

/*
 * Like XLogRegisterBuffer, but for registering a block that's not in the
 * shared buffer pool (i.e. when you don't have a Buffer for it).
 */
void
XLogRegisterBlock(uint8 block_id, RelFileNode *rnode, ForkNumber forknum,
				  BlockNumber blknum, Page page, uint8 flags)
{
	registered_buffer *regbuf;

	/* This is currently only used to WAL-log a full-page image of a page */
	Assert(flags & REGBUF_FORCE_IMAGE);
	Assert(begininsert_called);

	if (block_id >= max_registered_block_id)
		max_registered_block_id = block_id + 1;

	if (block_id >= max_registered_buffers)
		elog(ERROR, "too many registered buffers");

	regbuf = &registered_buffers[block_id];

	regbuf->rnode = *rnode;
	regbuf->forkno = forknum;
	regbuf->block = blknum;
	regbuf->page = page;
	regbuf->flags = flags;
	regbuf->rdata_tail = (XLogRecData *) &regbuf->rdata_head;
	regbuf->rdata_len = 0;

	/*
	 * Check that this page hasn't already been registered with some other
	 * block_id.
	 */
#ifdef USE_ASSERT_CHECKING
	{
		int			i;

		for (i = 0; i < max_registered_block_id; i++)
		{
			registered_buffer *regbuf_old = &registered_buffers[i];

			if (i == block_id || !regbuf_old->in_use)
				continue;

			Assert(!RelFileNodeEquals(regbuf_old->rnode, regbuf->rnode) ||
				   regbuf_old->forkno != regbuf->forkno ||
				   regbuf_old->block != regbuf->block);
		}
	}
#endif

	regbuf->in_use = true;
}

/*
 * Add data to the WAL record that's being constructed.
 *
 * The data is appended to the "main chunk", available at replay with
 * XLogRecGetData().
 */
void
XLogRegisterData(char *data, int len)
{
	XLogRecData *rdata;

	Assert(begininsert_called);

	if (num_rdatas >= max_rdatas)
		elog(ERROR, "too much WAL data");
	rdata = &rdatas[num_rdatas++];

	rdata->data = data;
	rdata->len = len;

	/*
	 * we use the mainrdata_last pointer to track the end of the chain, so no
	 * need to clear 'next' here.
	 */

	mainrdata_last->next = rdata;
	mainrdata_last = rdata;

	mainrdata_len += len;
}

/*
 * Add buffer-specific data to the WAL record that's being constructed.
 *
 * Block_id must reference a block previously registered with
 * XLogRegisterBuffer(). If this is called more than once for the same
 * block_id, the data is appended.
 *
 * The maximum amount of data that can be registered per block is 65535
 * bytes. That should be plenty; if you need more than BLCKSZ bytes to
 * reconstruct the changes to the page, you might as well just log a full
 * copy of it. (the "main data" that's not associated with a block is not
 * limited)
 */
void
XLogRegisterBufData(uint8 block_id, char *data, int len)
{
	registered_buffer *regbuf;
	XLogRecData *rdata;

	Assert(begininsert_called);

	/* find the registered buffer struct */
	regbuf = &registered_buffers[block_id];
	if (!regbuf->in_use)
		elog(ERROR, "no block with id %d registered with WAL insertion",
			 block_id);

	if (num_rdatas >= max_rdatas)
		elog(ERROR, "too much WAL data");
	rdata = &rdatas[num_rdatas++];

	rdata->data = data;
	rdata->len = len;

	regbuf->rdata_tail->next = rdata;
	regbuf->rdata_tail = rdata;
	regbuf->rdata_len += len;
}

BlockNumber* GetBockNumber(uint8 block_id) 
{
	return &registered_buffers[block_id].block;
}

RelFileNode* GetRnode(uint8 block_id) {
	return &registered_buffers[block_id].rnode;
}



/*
 * Set insert status flags for the upcoming WAL record.
 *
 * The flags that can be used here are:
 * - XLOG_INCLUDE_ORIGIN, to determine if the replication origin should be
 *	 included in the record.
 * - XLOG_MARK_UNIMPORTANT, to signal that the record is not important for
 *	 durability, which allows to avoid triggering WAL archiving and other
 *	 background activity.
 * - XLOG_INCLUDE_XID, a message-passing hack between XLogRecordAssemble
 *	 and XLogResetInsertion.
 */
void
XLogSetRecordFlags(uint8 flags)
{
	Assert(begininsert_called);
	curinsert_flags |= flags;
}

/*
 * Insert an XLOG record having the specified RMID and info bytes, with the
 * body of the record being the data and buffer references registered earlier
 * with XLogRegister* calls.
 *
 * Returns XLOG pointer to end of record (beginning of next record).
 * This can be used as LSN for data pages affected by the logged action.
 * (LSN is the XLOG point up to which the XLOG must be flushed to disk
 * before the data page can be written out.  This implements the basic
 * WAL rule "write the log before the data".)
 */
XLogRecPtr
XLogInsert(RmgrId rmid, uint8 info)
{
	XLogRecPtr	EndPos;

	/* XLogBeginInsert() must have been called. */
	if (!begininsert_called)
		elog(ERROR, "XLogBeginInsert was not called");

	/*
	 * The caller can set rmgr bits, XLR_SPECIAL_REL_UPDATE and
	 * XLR_CHECK_CONSISTENCY; the rest are reserved for use by me.
	 */
	if ((info & ~(XLR_RMGR_INFO_MASK |
				  XLR_SPECIAL_REL_UPDATE |
				  XLR_CHECK_CONSISTENCY)) != 0)
		elog(PANIC, "invalid xlog info mask %02X", info);

	TRACE_POSTGRESQL_WAL_INSERT(rmid, info);

	/*
	 * In bootstrap mode, we don't actually log anything but XLOG resources;
	 * return a phony record pointer.
	 */
	if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID)
	{
		XLogResetInsertion();
		EndPos = SizeOfXLogLongPHD; /* start of 1st chkpt record */
		return EndPos;
	}
	
	
	do
	{
		XLogRecPtr	RedoRecPtr;
		bool		doPageWrites;
		XLogRecPtr	fpw_lsn;
		XLogRecData *rdt;
		int			num_fpi = 0;

		/*
		 * Get values needed to decide whether to do full-page writes. Since
		 * we don't yet have an insertion lock, these could change under us,
		 * but XLogInsertRecord will recheck them once it has a lock.
		 */
		GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites);

		rdt = XLogRecordAssemble(rmid, info, RedoRecPtr, doPageWrites,
								 &fpw_lsn, &num_fpi);
		EndPos = He3DBXLogInsertRecord(rdt, fpw_lsn, curinsert_flags, num_fpi);
	} while (EndPos == InvalidXLogRecPtr);
	XLogResetInsertion();

	return EndPos;
}

static XLogRecData g_bkp_rdatas[XLR_MAX_BLOCK_ID + 1][2];
static XLogRecData g_main_data;

static void extendMainData(XLogReaderState *state) {
    int extendSize = 64;
    if (state->main_data_len + extendSize > state->main_data_bufsz)
    {

        if (state->main_data)
            pfree(state->main_data);

        /*
         * main_data_bufsz must be MAXALIGN'ed.  In many xlog record
         * types, we omit trailing struct padding on-disk to save a few
         * bytes; but compilers may generate accesses to the xlog struct
         * that assume that padding bytes are present.  If the palloc
         * request is not large enough to include such padding bytes then
         * we'll get valgrind complaints due to otherwise-harmless fetches
         * of the padding bytes.
         *
         * In addition, force the initial request to be reasonably large
         * so that we don't waste time with lots of trips through this
         * stanza.  BLCKSZ / 2 seems like a good compromise choice.
         */
        state->main_data_bufsz = MAXALIGN(Max(state->main_data_len + extendSize,
                                              BLCKSZ / 2));
        state->main_data = palloc(state->main_data_bufsz);
    }
}

static void convertMainData(XLogReaderState *state, OldXLogRecord *record) {
    RmgrId rmid = record->xl_rmid;
    uint8		info = (record->xl_info & ~XLR_INFO_MASK);
    switch(rmid) {
        case RM_HEAP2_ID:
        {
            if ((info & XLOG_HEAP_OPMASK) == XLOG_HEAP2_VISIBLE) {
                xl_old_heap_visible *xlrec = (xl_old_heap_visible *) XLogRecGetData(state);
                xl_heap_visible xlrecNew;
                xlrecNew.rnode = state->blocks[1].rnode;
                xlrecNew.blkno = state->blocks[1].blkno;
                xlrecNew.cutoff_xid = xlrec->cutoff_xid;
                xlrecNew.flags = xlrec->flags;
                extendMainData(state);
                state->main_data_len = sizeof(xl_heap_visible);
                memcpy(state->main_data,&xlrecNew,state->main_data_len);
            }
            break;
        }
        case RM_HEAP_ID:
        {
            if (((info & XLOG_HEAP_OPMASK) == XLOG_HEAP_UPDATE) ||
                ((info & XLOG_HEAP_OPMASK) == XLOG_HEAP_HOT_UPDATE)) {
                xl_old_heap_update *xlrec = (xl_old_heap_update *) XLogRecGetData(state);
                xl_heap_update xlrecNew;
                xlrecNew.old_xmax = xlrec->old_xmax;
                xlrecNew.old_offnum = xlrec->old_offnum;
                xlrecNew.old_infobits_set = xlrec->old_infobits_set;
                xlrecNew.flags = xlrec->flags;
                xlrecNew.new_xmax = xlrec->new_xmax;
                xlrecNew.new_offnum = xlrec->new_offnum;
                xlrecNew.newblk = state->blocks[0].blkno;
                if(state->max_block_id == 0){
                    xlrecNew.oldblk = state->blocks[0].blkno;
                } else{
                    xlrecNew.oldblk = state->blocks[1].blkno;
                }
                xlrecNew.rnode = state->blocks[0].rnode;
                extendMainData(state);
                state->main_data_len = sizeof(xl_heap_update);
                memcpy(state->main_data,&xlrecNew,state->main_data_len);
            }
            break;
        }
        case RM_BTREE_ID:
        {
            if (info == XLOG_BTREE_SPLIT_L || info == XLOG_BTREE_SPLIT_R) {
                xl_old_btree_split *xlrec = (xl_old_btree_split *) XLogRecGetData(state);
                xl_btree_split xlrecNew;
                xlrecNew.level = xlrec->level;
                xlrecNew.firstrightoff = xlrec->firstrightoff;
                xlrecNew.newitemoff = xlrec->newitemoff;
                xlrecNew.postingoff = xlrec->postingoff;
                xlrecNew.origpagenumber = state->blocks[0].blkno;
                xlrecNew.rightpagenumber = state->blocks[1].blkno;
                if (!XLogRecGetBlockTag(state, 2, NULL, NULL, &xlrecNew.spagenumber))
                    xlrecNew.spagenumber = P_NONE;
                extendMainData(state);
                state->main_data_len = sizeof(xl_btree_split);
                memcpy(state->main_data,&xlrecNew,state->main_data_len);
            }
            break;
        }
        case RM_GIST_ID:
        {
            if (info == XLOG_GIST_PAGE_SPLIT) {
                gistoldxlogPageSplit *xlrec = (gistoldxlogPageSplit *) XLogRecGetData(state);
                gistxlogPageSplit xlrecNew;
                xlrecNew.markfollowright = xlrec->markfollowright;
                xlrecNew.npage = xlrec->npage;
                xlrecNew.origleaf = xlrec->origleaf;
                xlrecNew.orignsn = xlrec->orignsn;
                xlrecNew.origrlink = xlrec->origrlink;
                xlrecNew.isroot = false;
                if (xlrec->npage > 0) {
                    if (state->blocks[1].blkno == GIST_ROOT_BLKNO) {
                        xlrecNew.isroot = true;
                    }
                }
                extendMainData(state);
                state->main_data_len = sizeof(gistxlogPageSplit);
                memcpy(state->main_data,&xlrecNew,state->main_data_len);
            }
            break;
        }
        case RM_SPGIST_ID:
        {
            if (info == XLOG_SPGIST_ADD_LEAF) {
                spgoldxlogAddLeaf *xlrec = (spgoldxlogAddLeaf *) XLogRecGetData(state);
                spgxlogAddLeaf xlrecNew;
                xlrecNew.newPage = xlrec->newPage;		/* init dest page? */
                xlrecNew.storesNulls = xlrec->storesNulls;	/* page is in the nulls tree? */
                xlrecNew.offnumLeaf = xlrec->offnumLeaf;	/* offset where leaf tuple gets placed */
                xlrecNew.offnumHeadLeaf = xlrec->offnumHeadLeaf;	/* offset of head tuple in chain, if any */

                xlrecNew.offnumParent = xlrec->offnumParent;	/* where the parent downlink is, if any */
                xlrecNew.nodeI = xlrec->nodeI;
                xlrecNew.blknoLeaf = state->blocks[0].blkno;
                extendMainData(state);
                state->main_data_len = sizeof(spgxlogAddLeaf);
                memcpy(state->main_data,&xlrecNew,state->main_data_len);

            } else if (info == XLOG_SPGIST_MOVE_LEAFS) {
                spgoldxlogMoveLeafs *xlrec = (spgoldxlogMoveLeafs *) XLogRecGetData(state);
                spgxlogMoveLeafs xlrecNew;
                xlrecNew.nMoves = xlrec->nMoves;			/* number of tuples moved from source page */
                xlrecNew.newPage = xlrec->newPage;		/* init dest page? */
                xlrecNew.replaceDead = xlrec->replaceDead;	/* are we replacing a DEAD source tuple? */
                xlrecNew.storesNulls = xlrec->storesNulls;	/* pages are in the nulls tree? */

                /* where the parent downlink is */
                xlrecNew.offnumParent = xlrec->offnumParent;
                xlrecNew.nodeI = xlrec->nodeI;

                xlrecNew.stateSrc = xlrec->stateSrc;

                /* for he3pg */
                xlrecNew.blknoDst = state->blocks[1].blkno;

                /*----------
                 * data follows:
                 *		array of deleted tuple numbers, length nMoves
                 *		array of inserted tuple numbers, length nMoves + 1 or 1
                 *		list of leaf tuples, length nMoves + 1 or 1 (unaligned!)
                 *
                 * Note: if replaceDead is true then there is only one inserted tuple
                 * number and only one leaf tuple in the data, because we are not copying
                 * the dead tuple from the source
                 *----------
                 */
                char* tmp = palloc(state->main_data_len-SizeOfOldSpgxlogMoveLeafs);
                memcpy(tmp,state->main_data+SizeOfOldSpgxlogMoveLeafs,state->main_data_len-SizeOfOldSpgxlogMoveLeafs);
                extendMainData(state);
                memcpy(state->main_data,&xlrecNew,SizeOfSpgxlogMoveLeafs);
                memcpy(state->main_data + SizeOfSpgxlogMoveLeafs, tmp, state->main_data_len-SizeOfOldSpgxlogMoveLeafs);
                state->main_data_len += SizeOfSpgxlogMoveLeafs-SizeOfOldSpgxlogMoveLeafs;
                pfree(tmp);
            } else if (info == XLOG_SPGIST_ADD_NODE) {
                spgoldxlogAddNode *xlrec = (spgoldxlogAddNode *) XLogRecGetData(state);
                spgxlogAddNode xlrecNew;
                xlrecNew.offnum = xlrec->offnum;
                /*
                 * Offset of the new tuple, on the new page (on backup block 1). Invalid,
                 * if we overwrote the old tuple in the original page).
                 */
                xlrecNew.offnumNew = xlrec->offnumNew;
                xlrecNew.newPage = xlrec->newPage;		/* init new page? */

                /*----
                 * Where is the parent downlink? parentBlk indicates which page it's on,
                 * and offnumParent is the offset within the page. The possible values for
                 * parentBlk are:
                 *
                 * 0: parent == original page
                 * 1: parent == new page
                 * 2: parent == different page (blk ref 2)
                 * -1: parent not updated
                 *----
                 */
                xlrecNew.parentBlk = xlrec->parentBlk;
                xlrecNew.offnumParent = xlrec->offnumParent;	/* offset within the parent page */

                xlrecNew.nodeI = xlrec->nodeI;
                xlrecNew.blkno1 = state->blocks[0].blkno;
                xlrecNew.stateSrc = xlrec->stateSrc;
                extendMainData(state);
                state->main_data_len = sizeof(spgxlogAddNode);
                memcpy(state->main_data,&xlrecNew,state->main_data_len);

            } else if (info == XLOG_SPGIST_PICKSPLIT) {
                spgoldxlogPickSplit *xlrec = (spgoldxlogPickSplit *) XLogRecGetData(state);
                spgxlogPickSplit xlrecNew;
                xlrecNew.isRootSplit = xlrec->isRootSplit;

                xlrecNew.nDelete = xlrec->nDelete;		/* n to delete from Src */
                xlrecNew.nInsert = xlrec->nInsert;		/* n to insert on Src and/or Dest */
                xlrecNew.initSrc = xlrec->initSrc;		/* re-init the Src page? */
                xlrecNew.initDest = xlrec->initDest;		/* re-init the Dest page? */

                /* for he3pg */
                xlrecNew.blknoInner = state->blocks[2].blkno;
                /* where to put new inner tuple */
                xlrecNew.offnumInner = xlrec->offnumInner;
                xlrecNew.initInner = xlrec->initInner;		/* re-init the Inner page? */

                xlrecNew.storesNulls = xlrec->storesNulls;	/* pages are in the nulls tree? */

                /* where the parent downlink is, if any */
                xlrecNew.innerIsParent = xlrec->innerIsParent;	/* is parent the same as inner page? */
                xlrecNew.offnumParent = xlrec->offnumParent;
                xlrecNew.nodeI = xlrec->nodeI;

                xlrecNew.stateSrc = xlrec->stateSrc;

                /*----------
                 * data follows:
                 *		array of deleted tuple numbers, length nDelete
                 *		array of inserted tuple numbers, length nInsert
                 *		array of page selector bytes for inserted tuples, length nInsert
                 *		new inner tuple (unaligned!)
                 *		list of leaf tuples, length nInsert (unaligned!)
                 *----------
                 */
                char* tmp = palloc(state->main_data_len-SizeOfOldSpgxlogPickSplit);
                memcpy(tmp,state->main_data+SizeOfOldSpgxlogPickSplit,state->main_data_len-SizeOfOldSpgxlogPickSplit);
                extendMainData(state);
                memcpy(state->main_data,&xlrecNew,SizeOfSpgxlogPickSplit);
                memcpy(state->main_data + SizeOfSpgxlogPickSplit, tmp, state->main_data_len-SizeOfOldSpgxlogPickSplit);
                state->main_data_len += SizeOfSpgxlogPickSplit-SizeOfOldSpgxlogPickSplit;
                pfree(tmp);
            }
            break;
        }
        case RM_BRIN_ID:
        {
            if (info == XLOG_BRIN_INSERT) {
                xl_old_brin_insert *xlrec = (xl_old_brin_insert *) XLogRecGetData(state);
                xl_brin_insert xlrecNew;
                xlrecNew.heapBlk = xlrec->heapBlk;
                /* extra information needed to update the revmap */
                xlrecNew.pagesPerRange = xlrec->pagesPerRange;
                xlrecNew.block0 = state->blocks[0].blkno;
                /* offset number in the main page to insert the tuple to. */
                xlrecNew.offnum = xlrec->offnum;
                extendMainData(state);
                state->main_data_len = sizeof(xl_brin_insert);
                memcpy(state->main_data,&xlrecNew,state->main_data_len);
            } else if ( info == XLOG_BRIN_UPDATE) {
                xl_old_brin_update *xlrec = (xl_old_brin_update *) XLogRecGetData(state);
                xl_brin_update xlrecUpdate;
                xl_old_brin_insert *xlrecInsert = &xlrec->insert;
                xl_brin_insert xlrecNew;
                xlrecNew.heapBlk = xlrecInsert->heapBlk;
                /* extra information needed to update the revmap */
                xlrecNew.pagesPerRange = xlrecInsert->pagesPerRange;
                xlrecNew.block0 = state->blocks[0].blkno;
                /* offset number in the main page to insert the tuple to. */
                xlrecNew.offnum = xlrecInsert->offnum;
                xlrecUpdate.oldOffnum = xlrec->oldOffnum;
                xlrecUpdate.insert = xlrecNew;
                extendMainData(state);
                state->main_data_len = sizeof(xl_brin_update);
                memcpy(state->main_data,&xlrecUpdate,state->main_data_len);
            }
            break;
        }
        default:
            break;
    }
}

XLogRecData *DecodeXLogRecordAssemble(XLogReaderState *state, OldXLogRecord *record,
                                      XLogRecPtr RedoRecPtr, bool doPageWrites,
                                      XLogRecPtr *fpw_lsn, int *num_fpi)
{

    /*
     * Make an rdata chain containing all the data portions of all block
     * references. This includes the data for full-page images. Also append
     * the headers for the block references in the scratch buffer.
     */
    RmgrId rmid = record->xl_rmid;
    uint8 info = record->xl_info;
    *fpw_lsn = InvalidXLogRecPtr;
    int block_id;
    XLogRecord *rechdr = NULL;
    group_total_len = 0;
    grouo_rec_count = 0;
    grouo_rec_cur_count = 0;
    int maxidx = (state->max_block_id < 0 ? 1:state->max_block_id+1);
    bool isDone = false;
    for (block_id = 0; block_id < maxidx; block_id++)
    {
        XLogRecData* rdt;
        uint32 total_len;
        total_len = 0;
        pg_crc32c rdata_crc = 0;
        XLogRecData *rdt_datas_last;
        char	   *scratch;
        // char		linkkey[36];
        groupRecData[grouo_rec_count].next = NULL;
        rdt_datas_last = &groupRecData[grouo_rec_count];

        scratch = hdr_scratch + grouo_rec_count * SINGLE_SCRATCH_SIZE;
        groupRecData[grouo_rec_count].data = scratch;
        /*group_total_len+=HEADER_SCRATCH_SIZE;*/
        grouphead[grouo_rec_count]=(XLogRecord *)scratch;
        /* The record begins with the fixed-size header */
        rechdr = (XLogRecord *)scratch;
        scratch += SizeOfXLogRecord;
        if (state->max_block_id >= 0) {
            DecodedBkpBlock *blkbuf = &state->blocks[block_id];
			if (!blkbuf->in_use)
				continue;
            XLogRecData* bkp_rdatas = g_bkp_rdatas[block_id];

            XLogRecordBlockHeader bkpb;
            XLogRecordBlockImageHeader bimg;
            XLogRecordBlockCompressHeader cbimg = {0};
            bkpb.id = 0;
            bkpb.fork_flags = blkbuf->flags;
            bkpb.data_length = blkbuf->data_len;
            //total_len += bkpb.data_length;
            /* Ok, copy the header to the scratch buffer */
            memcpy(scratch, &bkpb, SizeOfXLogRecordBlockHeader);
            scratch += SizeOfXLogRecordBlockHeader;
            if (blkbuf->has_image) {
                bimg.bimg_info = blkbuf->bimg_info;
                bimg.hole_offset = blkbuf->hole_offset;
                bimg.length = blkbuf->bimg_len;
                memcpy(scratch, &bimg, SizeOfXLogRecordBlockImageHeader);
                scratch += SizeOfXLogRecordBlockImageHeader;
                rdt_datas_last->next = &bkp_rdatas[0];
                rdt_datas_last = rdt_datas_last->next;
                bkp_rdatas[0].data = blkbuf->bkp_image;
                bkp_rdatas[0].len = blkbuf->bimg_len;
                if (bimg.bimg_info & BKPIMAGE_IS_COMPRESSED) {
                    cbimg.hole_length = blkbuf->hole_length;
                    if (bimg.bimg_info & BKPIMAGE_HAS_HOLE) {
                        memcpy(scratch, &cbimg,
                               SizeOfXLogRecordBlockCompressHeader);
                        scratch += SizeOfXLogRecordBlockCompressHeader;
                    }
                }
                total_len += bimg.length;
                *num_fpi += 1;
            }
            if (blkbuf->has_data) {
                rdt_datas_last->next = &bkp_rdatas[1];
                rdt_datas_last = rdt_datas_last->next;
                bkp_rdatas[1].data = blkbuf->data;
                bkp_rdatas[1].len = blkbuf->data_len;
                total_len += blkbuf->data_len;
            }
            memcpy(scratch, &blkbuf->rnode, sizeof(RelFileNode));
            scratch += sizeof(RelFileNode);

            memcpy(scratch, &blkbuf->blkno, sizeof(BlockNumber));
            scratch += sizeof(BlockNumber);
        }

        if (state->record_origin != InvalidRepOriginId) {
            *(scratch++) = (char)XLR_BLOCK_ID_ORIGIN;
            memcpy(scratch, &state->record_origin, sizeof(RepOriginId));
            scratch += sizeof(RepOriginId);
        }

        if (state->toplevel_xid != InvalidTransactionId) {
            *(scratch++) = (char)XLR_BLOCK_ID_TOPLEVEL_XID;
            memcpy(scratch, &state->toplevel_xid, sizeof(TransactionId));
            scratch += sizeof(TransactionId);
        }

        if (state->main_data_len > 0) {
            rdt_datas_last->next = &g_main_data;
            rdt_datas_last = &g_main_data;
            if (isDone == false) {
                convertMainData(state,record);
                g_main_data.data = state->main_data;
                g_main_data.len = state->main_data_len;
                isDone = true;
            }
            if (state->main_data_len > 255) {
                *(scratch++) = (char)XLR_BLOCK_ID_DATA_LONG;
                memcpy(scratch, &state->main_data_len, sizeof(uint32));
                scratch += sizeof(uint32);
            } else {
                *(scratch++) = (char)XLR_BLOCK_ID_DATA_SHORT;
                *(scratch++) = (uint8)state->main_data_len;
            }
            total_len += state->main_data_len;
        }

        rdt_datas_last->next = NULL;
        groupRecData[grouo_rec_count].len = scratch - groupRecData[grouo_rec_count].data;
        total_len += groupRecData[grouo_rec_count].len;
        grouplens[grouo_rec_count] = total_len;

     /*
     * Fill in the fields in the record header. Prev-link is filled in later,
     * once we know where in the WAL the record will be inserted. The CRC does
     * not include the record header yet.
     */
        rechdr->xl_xid = record->xl_xid;
        rechdr->xl_tot_len = total_len;
        rechdr->xl_info = info;
        rechdr->xl_rmid = rmid;
        rechdr->xl_prev = InvalidXLogRecPtr;
        rechdr->xl_crc = rdata_crc;
        rechdr->blocknum = block_id;
        rechdr->mtr = false;

        group_total_len += total_len;
        grouo_rec_count++;
    }
    rechdr->mtr = true;

    return &groupRecData[0];
}
/*
 * Assemble a WAL record from the registered data and buffers into an
 * XLogRecData chain, ready for insertion with XLogInsertRecord().
 *
 * The record header fields are filled in, except for the xl_prev field. The
 * calculated CRC does not include the record header yet.
 *
 * If there are any registered buffers, and a full-page image was not taken
 * of all of them, *fpw_lsn is set to the lowest LSN among such pages. This
 * signals that the assembled record is only good for insertion on the
 * assumption that the RedoRecPtr and doPageWrites values were up-to-date.
 */
static XLogRecData *
XLogRecordAssemble(RmgrId rmid, uint8 info,
				   XLogRecPtr RedoRecPtr, bool doPageWrites,
				   XLogRecPtr *fpw_lsn, int *num_fpi)
{
	
	/*
	 * Enforce consistency checks for this record if user is looking for it.
	 * Do this before at the beginning of this routine to give the possibility
	 * for callers of XLogInsert() to pass XLR_CHECK_CONSISTENCY directly for
	 * a record.
	 */
	if (wal_consistency_checking[rmid])
		info |= XLR_CHECK_CONSISTENCY;

	/*
	 * Make an rdata chain containing all the data portions of all block
	 * references. This includes the data for full-page images. Also append
	 * the headers for the block references in the scratch buffer.
	 */
	*fpw_lsn = InvalidXLogRecPtr;
	int block_id;
	XLogRecord *rechdr = NULL;
	group_total_len = 0;
	grouo_rec_count = 0;
	int maxidx = (max_registered_block_id == 0?1:max_registered_block_id);
	for (block_id = 0; block_id < maxidx; block_id++)
	{
		XLogRecData* rdt;
    	uint32 total_len;
		total_len = 0;
		pg_crc32c rdata_crc;
		XLogRecData *rdt_datas_last;
		char	   *scratch;
		// char		linkkey[36];
		groupRecData[grouo_rec_count].next = NULL;
		rdt_datas_last = &groupRecData[grouo_rec_count];
	
		scratch = hdr_scratch + grouo_rec_count * SINGLE_SCRATCH_SIZE;
		groupRecData[grouo_rec_count].data = scratch;
		/*group_total_len+=HEADER_SCRATCH_SIZE;*/
		grouphead[grouo_rec_count]=(XLogRecord *)scratch;
		/* The record begins with the fixed-size header */
		rechdr = (XLogRecord *)scratch;
		scratch += SizeOfXLogRecord;
		if (max_registered_block_id != 0) {
			registered_buffer *regbuf = &registered_buffers[block_id];
					
			bool needs_backup;
			bool needs_data;
			XLogRecordBlockHeader bkpb;
			XLogRecordBlockImageHeader bimg;
			XLogRecordBlockCompressHeader cbimg = {0};
		
			bool is_compressed = false;
			bool include_image;

			if (!regbuf->in_use)
				continue;
		
			/*
			 * Note: this function can be called multiple times for the same record.
			 * All the modifications we do to the rdata chains below must handle that.
			 */
			/* Determine if this block needs to be backed up */
			if (regbuf->flags & REGBUF_FORCE_IMAGE)
				needs_backup = true;
			else if (regbuf->flags & REGBUF_NO_IMAGE)
				needs_backup = false;
			else if (!doPageWrites)
				needs_backup = false;
			else
			{
				/*
				 * We assume page LSN is first data on *every* page that can be
				 * passed to XLogInsert, whether it has the standard page layout
				 * or not.
				 */
				XLogRecPtr page_lsn = PageGetLSN(regbuf->page);

				needs_backup = (page_lsn <= RedoRecPtr);
				if (!needs_backup)
				{
					if (*fpw_lsn == InvalidXLogRecPtr || page_lsn < *fpw_lsn)
						*fpw_lsn = page_lsn;
				}
			}

			/* Determine if the buffer data needs to included */
			if (regbuf->rdata_len == 0)
				needs_data = false;
			else if ((regbuf->flags & REGBUF_KEEP_DATA) != 0)
				needs_data = true;
			else
				needs_data = !needs_backup;

			bkpb.id = 0;
			bkpb.fork_flags = regbuf->forkno;
			bkpb.data_length = 0;

			if ((regbuf->flags & REGBUF_WILL_INIT) == REGBUF_WILL_INIT)
				bkpb.fork_flags |= BKPBLOCK_WILL_INIT;

			/*
			 * If needs_backup is true or WAL checking is enabled for current
			 * resource manager, log a full-page write for the current block.
			 */
			include_image = needs_backup || (info & XLR_CHECK_CONSISTENCY) != 0;

			if (include_image)
			{
				Page page = regbuf->page;
				uint16 compressed_len = 0;

				/*
				 * The page needs to be backed up, so calculate its hole length
				 * and offset.
				 */
				if (regbuf->flags & REGBUF_STANDARD)
				{
					/* Assume we can omit data between pd_lower and pd_upper */
					uint16 lower = ((PageHeader)page)->pd_lower;
					uint16 upper = ((PageHeader)page)->pd_upper;

					if (lower >= SizeOfPageHeaderData &&
						upper > lower &&
						upper <= BLCKSZ)
					{
						bimg.hole_offset = lower;
						cbimg.hole_length = upper - lower;
					}
					else
					{
						/* No "hole" to remove */
						bimg.hole_offset = 0;
						cbimg.hole_length = 0;
					}
				}
				else
				{
					/* Not a standard page header, don't try to eliminate "hole" */
					bimg.hole_offset = 0;
					cbimg.hole_length = 0;
				}


				/*
				 * Try to compress a block image if wal_compression is enabled
				 */
				if (wal_compression)
				{
					is_compressed =
						XLogCompressBackupBlock(page, bimg.hole_offset,
												cbimg.hole_length,
												regbuf->compressed_page,
												&compressed_len);
				}

				/*
				 * Fill in the remaining fields in the XLogRecordBlockHeader
				 * struct
				 */
				bkpb.fork_flags |= BKPBLOCK_HAS_IMAGE;

				/* Report a full page image constructed for the WAL record */
				*num_fpi += 1;

				/*
				 * Construct XLogRecData entries for the page content.
				 */
				rdt_datas_last->next = &regbuf->bkp_rdatas[0];
				rdt_datas_last = rdt_datas_last->next;

				bimg.bimg_info = (cbimg.hole_length == 0) ? 0 : BKPIMAGE_HAS_HOLE;

				/*
				 * If WAL consistency checking is enabled for the resource manager
				 * of this WAL record, a full-page image is included in the record
				 * for the block modified. During redo, the full-page is replayed
				 * only if BKPIMAGE_APPLY is set.
				 */
				if (needs_backup)
					bimg.bimg_info |= BKPIMAGE_APPLY;

				if (is_compressed)
				{
					bimg.length = compressed_len;
					bimg.bimg_info |= BKPIMAGE_IS_COMPRESSED;

					rdt_datas_last->data = regbuf->compressed_page;
					rdt_datas_last->len = compressed_len;
				}
				else
				{
					bimg.length = BLCKSZ - cbimg.hole_length;

					if (cbimg.hole_length == 0)
					{
						rdt_datas_last->data = page;
						rdt_datas_last->len = BLCKSZ;
					}
					else
					{
						/* must skip the hole */
						rdt_datas_last->data = page;
						rdt_datas_last->len = bimg.hole_offset;

						rdt_datas_last->next = &regbuf->bkp_rdatas[1];
						rdt_datas_last = rdt_datas_last->next;

						rdt_datas_last->data =
							page + (bimg.hole_offset + cbimg.hole_length);
						rdt_datas_last->len =
							BLCKSZ - (bimg.hole_offset + cbimg.hole_length);
					}
				}

				total_len += bimg.length;
			}

			if (needs_data)
			{
				/*
				 * Link the caller-supplied rdata chain for this buffer to the
				 * overall list.
				 */
				bkpb.fork_flags |= BKPBLOCK_HAS_DATA;
				bkpb.data_length = regbuf->rdata_len;
				total_len += regbuf->rdata_len;

				rdt_datas_last->next = regbuf->rdata_head;
				rdt_datas_last = regbuf->rdata_tail;
			}



			/* Ok, copy the header to the scratch buffer */
			memcpy(scratch, &bkpb, SizeOfXLogRecordBlockHeader);
			scratch += SizeOfXLogRecordBlockHeader;
			if (include_image)
			{
				memcpy(scratch, &bimg, SizeOfXLogRecordBlockImageHeader);
				scratch += SizeOfXLogRecordBlockImageHeader;
				if (cbimg.hole_length != 0 && is_compressed)
				{
					memcpy(scratch, &cbimg,
						   SizeOfXLogRecordBlockCompressHeader);
					scratch += SizeOfXLogRecordBlockCompressHeader;
				}
			}

			memcpy(scratch, &regbuf->rnode, sizeof(RelFileNode));
			scratch += sizeof(RelFileNode);

			memcpy(scratch, &regbuf->block, sizeof(BlockNumber));
			scratch += sizeof(BlockNumber);
		}

		/*******/

		/* followed by the record's origin, if any */
		if ((curinsert_flags & XLOG_INCLUDE_ORIGIN) &&
			replorigin_session_origin != InvalidRepOriginId)                                      //1
		{
			*(scratch++) = (char)XLR_BLOCK_ID_ORIGIN;
			memcpy(scratch, &replorigin_session_origin, sizeof(replorigin_session_origin));
			scratch += sizeof(replorigin_session_origin);
		}

		/* followed by toplevel XID, if not already included in previous record */
		if (IsSubTransactionAssignmentPending())                                                  //2
		{
			TransactionId xid = GetTopTransactionIdIfAny();

			/* update the flag (later used by XLogResetInsertion) */
			XLogSetRecordFlags(XLOG_INCLUDE_XID);

			*(scratch++) = (char)XLR_BLOCK_ID_TOPLEVEL_XID;
			memcpy(scratch, &xid, sizeof(TransactionId));
			scratch += sizeof(TransactionId);
		}

		/* followed by main data, if any */
		if (mainrdata_len > 0)
		{
			if (mainrdata_len > 255)                                                              //3
			{
				*(scratch++) = (char)XLR_BLOCK_ID_DATA_LONG; 
				memcpy(scratch, &mainrdata_len, sizeof(uint32));
				scratch += sizeof(uint32);
			}
			else                                                                                  //4
			{
				*(scratch++) = (char)XLR_BLOCK_ID_DATA_SHORT;
				*(scratch++) = (uint8)mainrdata_len;
			}
			rdt_datas_last->next = mainrdata_head;
			rdt_datas_last = mainrdata_last;
			total_len += mainrdata_len;
		}
	
		rdt_datas_last->next = NULL;
		groupRecData[grouo_rec_count].len = scratch - groupRecData[grouo_rec_count].data;
		total_len += groupRecData[grouo_rec_count].len;
		//grouplens[grouo_rec_count]=MAXALIGN(total_len);
		grouplens[grouo_rec_count]=total_len;
   
	     /*
	     
		 in func CopyXLogRecordToWAL, we need freespace >= sizeof(uint32). so size of xlog size must be Maxalignalign 


		 */
		  /*total_len=MAXALIGN(total_len);*/
	     
			/*
		 * Calculate CRC of the data
		 *
		 * Note that the record header isn't added into the CRC initially since we
		 * don't know the prev-link yet.  Thus, the CRC will represent the CRC of
		 * the whole record in the order: rdata, then backup blocks, then record
		 * header.
		 */

		INIT_CRC32C(rdata_crc);

		COMP_CRC32C(rdata_crc, groupRecData[grouo_rec_count].data + SizeOfXLogRecord, groupRecData[grouo_rec_count].len - SizeOfXLogRecord);
		
		rdt = groupRecData[grouo_rec_count].next;
		
		for (; rdt != NULL; rdt = rdt->next)
			COMP_CRC32C(rdata_crc, rdt->data, rdt->len);

		/*
	 * Fill in the fields in the record header. Prev-link is filled in later,
	 * once we know where in the WAL the record will be inserted. The CRC does
	 * not include the record header yet.
	 */
		rechdr->xl_xid = GetCurrentTransactionIdIfAny();
		rechdr->xl_tot_len = total_len;
		rechdr->xl_info = info;
		rechdr->xl_rmid = rmid;
		rechdr->xl_prev = InvalidXLogRecPtr;
		rechdr->xl_crc = rdata_crc;
		rechdr->blocknum=block_id;
		rechdr->mtr = false;

		group_total_len += total_len;
		grouo_rec_count++;
	}
	rechdr->mtr = true;

	return &groupRecData[0];
}


/*
 * Create a compressed version of a backup block image.
 *
 * Returns false if compression fails (i.e., compressed result is actually
 * bigger than original). Otherwise, returns true and sets 'dlen' to
 * the length of compressed block image.
 */
static bool
XLogCompressBackupBlock(char *page, uint16 hole_offset, uint16 hole_length,
						char *dest, uint16 *dlen)
{
	int32		orig_len = BLCKSZ - hole_length;
	int32		len;
	int32		extra_bytes = 0;
	char	   *source;
	PGAlignedBlock tmp;

	if (hole_length != 0)
	{
		/* must skip the hole */
		source = tmp.data;
		memcpy(source, page, hole_offset);
		memcpy(source + hole_offset,
			   page + (hole_offset + hole_length),
			   BLCKSZ - (hole_length + hole_offset));

		/*
		 * Extra data needs to be stored in WAL record for the compressed
		 * version of block image if the hole exists.
		 */
		extra_bytes = SizeOfXLogRecordBlockCompressHeader;
	}
	else
		source = page;

	/*
	 * We recheck the actual size even if pglz_compress() reports success and
	 * see if the number of bytes saved by compression is larger than the
	 * length of extra data needed for the compressed version of block image.
	 */
	len = pglz_compress(source, orig_len, dest, PGLZ_strategy_default);
	if (len >= 0 &&
		len + extra_bytes < orig_len)
	{
		*dlen = (uint16) len;	/* successful compression */
		return true;
	}
	return false;
}

/*
 * Determine whether the buffer referenced has to be backed up.
 *
 * Since we don't yet have the insert lock, fullPageWrites and forcePageWrites
 * could change later, so the result should be used for optimization purposes
 * only.
 */
bool
XLogCheckBufferNeedsBackup(Buffer buffer)
{
	XLogRecPtr	RedoRecPtr;
	bool		doPageWrites;
	Page		page;

	GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites);

	page = BufferGetPage(buffer);

	if (doPageWrites && PageGetLSN(page) <= RedoRecPtr)
		return true;			/* buffer requires backup */

	return false;				/* buffer does not need to be backed up */
}

/*
 * Write a backup block if needed when we are setting a hint. Note that
 * this may be called for a variety of page types, not just heaps.
 *
 * Callable while holding just share lock on the buffer content.
 *
 * We can't use the plain backup block mechanism since that relies on the
 * Buffer being exclusively locked. Since some modifications (setting LSN, hint
 * bits) are allowed in a sharelocked buffer that can lead to wal checksum
 * failures. So instead we copy the page and insert the copied data as normal
 * record data.
 *
 * We only need to do something if page has not yet been full page written in
 * this checkpoint round. The LSN of the inserted wal record is returned if we
 * had to write, InvalidXLogRecPtr otherwise.
 *
 * It is possible that multiple concurrent backends could attempt to write WAL
 * records. In that case, multiple copies of the same block would be recorded
 * in separate WAL records by different backends, though that is still OK from
 * a correctness perspective.
 */
XLogRecPtr
XLogSaveBufferForHint(Buffer buffer, bool buffer_std)
{
	XLogRecPtr	recptr = InvalidXLogRecPtr;
	XLogRecPtr	lsn;
	XLogRecPtr	RedoRecPtr;

	/*
	 * Ensure no checkpoint can change our view of RedoRecPtr.
	 */
	Assert(MyProc->delayChkpt);

	/*
	 * Update RedoRecPtr so that we can make the right decision
	 */
	RedoRecPtr = GetRedoRecPtr();

	/*
	 * We assume page LSN is first data on *every* page that can be passed to
	 * XLogInsert, whether it has the standard page layout or not. Since we're
	 * only holding a share-lock on the page, we must take the buffer header
	 * lock when we look at the LSN.
	 */
	lsn = BufferGetLSNAtomic(buffer);

	if (lsn <= RedoRecPtr)
	{
		int			flags;
		PGAlignedBlock copied_buffer;
		char	   *origdata = (char *) BufferGetBlock(buffer);
		RelFileNode rnode;
		ForkNumber	forkno;
		BlockNumber blkno;

		/*
		 * Copy buffer so we don't have to worry about concurrent hint bit or
		 * lsn updates. We assume pd_lower/upper cannot be changed without an
		 * exclusive lock, so the contents bkp are not racy.
		 */
		if (buffer_std)
		{
			/* Assume we can omit data between pd_lower and pd_upper */
			Page		page = BufferGetPage(buffer);
			uint16		lower = ((PageHeader) page)->pd_lower;
			uint16		upper = ((PageHeader) page)->pd_upper;

			memcpy(copied_buffer.data, origdata, lower);
			memcpy(copied_buffer.data + upper, origdata + upper, BLCKSZ - upper);
		}
		else
			memcpy(copied_buffer.data, origdata, BLCKSZ);

		XLogBeginInsert();

		flags = REGBUF_FORCE_IMAGE;
		if (buffer_std)
			flags |= REGBUF_STANDARD;

		BufferGetTag(buffer, &rnode, &forkno, &blkno);
		XLogRegisterBlock(0, &rnode, forkno, blkno, copied_buffer.data, flags);

		recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI_FOR_HINT);
	}

	return recptr;
}

/*
 * Write a WAL record containing a full image of a page. Caller is responsible
 * for writing the page to disk after calling this routine.
 *
 * Note: If you're using this function, you should be building pages in private
 * memory and writing them directly to smgr.  If you're using buffers, call
 * log_newpage_buffer instead.
 *
 * If the page follows the standard page layout, with a PageHeader and unused
 * space between pd_lower and pd_upper, set 'page_std' to true. That allows
 * the unused space to be left out from the WAL record, making it smaller.
 */
XLogRecPtr
log_newpage(RelFileNode *rnode, ForkNumber forkNum, BlockNumber blkno,
			Page page, bool page_std)
{
	int			flags;
	XLogRecPtr	recptr;

	flags = REGBUF_FORCE_IMAGE;
	if (page_std)
		flags |= REGBUF_STANDARD;

	XLogBeginInsert();
	XLogRegisterBlock(0, rnode, forkNum, blkno, page, flags);
	recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI);

	/*
	 * The page may be uninitialized. If so, we can't set the LSN because that
	 * would corrupt the page.
	 */
	if (!PageIsNew(page))
	{
		PageSetLSN(page, recptr);
	}

	return recptr;
}

/*
 * Like log_newpage(), but allows logging multiple pages in one operation.
 * It is more efficient than calling log_newpage() for each page separately,
 * because we can write multiple pages in a single WAL record.
 */
void
log_newpages(RelFileNode *rnode, ForkNumber forkNum, int num_pages,
			 BlockNumber *blknos, Page *pages, bool page_std)
{
	int			flags;
	XLogRecPtr	recptr;
	int			i;
	int			j;

	flags = REGBUF_FORCE_IMAGE;
	if (page_std)
		flags |= REGBUF_STANDARD;

	/*
	 * Iterate over all the pages. They are collected into batches of
	 * XLR_MAX_BLOCK_ID pages, and a single WAL-record is written for each
	 * batch.
	 */
	XLogEnsureRecordSpace(XLR_MAX_BLOCK_ID - 1, 0);

	i = 0;
	while (i < num_pages)
	{
		int			batch_start = i;
		int			nbatch;

		XLogBeginInsert();

		nbatch = 0;
		while (nbatch < XLR_MAX_BLOCK_ID && i < num_pages)
		{
			XLogRegisterBlock(nbatch, rnode, forkNum, blknos[i], pages[i], flags);
			i++;
			nbatch++;
		}

		recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI);

		for (j = batch_start; j < i; j++)
		{
			/*
			 * The page may be uninitialized. If so, we can't set the LSN
			 * because that would corrupt the page.
			 */
			if (!PageIsNew(pages[j]))
			{
				PageSetLSN(pages[j], recptr);
			}
		}
	}
}

/*
 * Write a WAL record containing a full image of a page.
 *
 * Caller should initialize the buffer and mark it dirty before calling this
 * function.  This function will set the page LSN.
 *
 * If the page follows the standard page layout, with a PageHeader and unused
 * space between pd_lower and pd_upper, set 'page_std' to true. That allows
 * the unused space to be left out from the WAL record, making it smaller.
 */
XLogRecPtr
log_newpage_buffer(Buffer buffer, bool page_std)
{
	Page		page = BufferGetPage(buffer);
	RelFileNode rnode;
	ForkNumber	forkNum;
	BlockNumber blkno;

	/* Shared buffers should be modified in a critical section. */
	Assert(CritSectionCount > 0);

	BufferGetTag(buffer, &rnode, &forkNum, &blkno);

	return log_newpage(&rnode, forkNum, blkno, page, page_std);
}

/*
 * WAL-log a range of blocks in a relation.
 *
 * An image of all pages with block numbers 'startblk' <= X < 'endblk' is
 * written to the WAL. If the range is large, this is done in multiple WAL
 * records.
 *
 * If all page follows the standard page layout, with a PageHeader and unused
 * space between pd_lower and pd_upper, set 'page_std' to true. That allows
 * the unused space to be left out from the WAL records, making them smaller.
 *
 * NOTE: This function acquires exclusive-locks on the pages. Typically, this
 * is used on a newly-built relation, and the caller is holding a
 * AccessExclusiveLock on it, so no other backend can be accessing it at the
 * same time. If that's not the case, you must ensure that this does not
 * cause a deadlock through some other means.
 */
void
log_newpage_range(Relation rel, ForkNumber forkNum,
				  BlockNumber startblk, BlockNumber endblk,
				  bool page_std)
{
	int			flags;
	BlockNumber blkno;

	flags = REGBUF_FORCE_IMAGE;
	if (page_std)
		flags |= REGBUF_STANDARD;

	/*
	 * Iterate over all the pages in the range. They are collected into
	 * batches of XLR_MAX_BLOCK_ID pages, and a single WAL-record is written
	 * for each batch.
	 */
	XLogEnsureRecordSpace(XLR_MAX_BLOCK_ID - 1, 0);

	blkno = startblk;
	while (blkno < endblk)
	{
		Buffer		bufpack[XLR_MAX_BLOCK_ID];
		XLogRecPtr	recptr;
		int			nbufs;
		int			i;

		CHECK_FOR_INTERRUPTS();

		/* Collect a batch of blocks. */
		nbufs = 0;
		while (nbufs < XLR_MAX_BLOCK_ID && blkno < endblk)
		{
			Buffer		buf = ReadBufferExtended(rel, forkNum, blkno,
												 RBM_NORMAL, NULL);

			LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);

			/*
			 * Completely empty pages are not WAL-logged. Writing a WAL record
			 * would change the LSN, and we don't want that. We want the page
			 * to stay empty.
			 */
			if (!PageIsNew(BufferGetPage(buf)))
				bufpack[nbufs++] = buf;
			else
				UnlockReleaseBuffer(buf);
			blkno++;
		}

		/* Write WAL record for this batch. */
		XLogBeginInsert();

		START_CRIT_SECTION();
		for (i = 0; i < nbufs; i++)
		{
			XLogRegisterBuffer(i, bufpack[i], flags);
			MarkBufferDirty(bufpack[i]);
		}

		recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI);

		for (i = 0; i < nbufs; i++)
		{
			PageSetLSN(BufferGetPage(bufpack[i]), recptr);
			UnlockReleaseBuffer(bufpack[i]);
		}
		END_CRIT_SECTION();
	}
}

/*
 * Allocate working buffers needed for WAL record construction.
 */
void
InitXLogInsert(void)
{
	/* Initialize the working areas */
	if (xloginsert_cxt == NULL)
	{
		xloginsert_cxt = AllocSetContextCreate(TopMemoryContext,
											   "WAL record construction",
											   ALLOCSET_DEFAULT_SIZES);
	}

	if (registered_buffers == NULL)
	{
		registered_buffers = (registered_buffer *)
			MemoryContextAllocZero(xloginsert_cxt,
								   sizeof(registered_buffer) * (XLR_NORMAL_MAX_BLOCK_ID + 1));
		max_registered_buffers = XLR_NORMAL_MAX_BLOCK_ID + 1;
	}
	if (rdatas == NULL)
	{
		rdatas = MemoryContextAlloc(xloginsert_cxt,
									sizeof(XLogRecData) * XLR_NORMAL_RDATAS);
		max_rdatas = XLR_NORMAL_RDATAS;
	}

	/*
	 * Allocate a buffer to hold the header information for a WAL record.
	 */
	if (hdr_scratch == NULL)
		hdr_scratch = MemoryContextAllocZero(xloginsert_cxt,
											 HEADER_SCRATCH_SIZE);
}
